#!/usr/bin/env python
#
# OpenSlide, a library for reading whole slide image files
#
# Copyright (c) 2012-2015 Carnegie Mellon University
# Copyright (c) 2015-2021 Benjamin Gilbert
# All rights reserved.
#
# OpenSlide is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, version 2.1.
#
# OpenSlide is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with OpenSlide. If not, see
# <http://www.gnu.org/licenses/>.
#

from collections import defaultdict
from configparser import RawConfigParser
from contextlib import closing, contextmanager
import errno
import filecmp
import fnmatch
from hashlib import sha256
import inspect
from lzma import LZMACompressor
import os
import re
import requests
import shlex
from shutil import copytree, rmtree
import socket
import subprocess
import sys
import tarfile
from tempfile import (mkdtemp, TemporaryDirectory, TemporaryFile,
        NamedTemporaryFile)
import textwrap
from threading import Thread
from time import time as curtime
from urllib.parse import urljoin
import yaml
from zipfile import ZipFile

TESTDATA_URL = 'http://openslide.cs.cmu.edu/download/openslide-testdata/'
DEFAULT_FROZEN_BUCKET = 'openslide-frozen-testdata'
VALGRIND_SUPPRESSIONS = '!!SRCDIR!!/valgrind.supp'
CASEROOT = '!!SRCDIR!!/cases'
SLIDELIST = '!!SRCDIR!!/cases/slides.yaml'
FROZENLIST = '!!SRCDIR!!/cases/frozen.yaml'
MOSAICLIST = '!!SRCDIR!!/cases/mosaic.ini'
WORKROOT = '!!BUILDDIR!!/_slidedata/unpacked'
PRISTINE = '!!BUILDDIR!!/_slidedata/pristine'
FUSEMOUNT = '!!BUILDDIR!!/_slidedata/fuse'
FROZENBASE = '!!BUILDDIR!!/_slidedata'
FROZEN = '!!BUILDDIR!!/_slidedata/frozen'
FEATURES = set('!!FEATURES!!'.split())
TESTCONF = 'config.yaml'

GREEN = '\033[1;32m'
BLUE = '\033[1;34m'
RED = '\033[1;31m'
RESET = '\033[1;0m'

_commands = []
_command_funcs = {}

SKIP = object()


if '!!CYGWIN_CROSS_TEST!!':
    import ctypes

    _cygwin = ctypes.CDLL('cygwin1.dll', use_errno=True)
    _cygwin_conv_path = _cygwin.cygwin_conv_path
    _cygwin_conv_path.argtypes = [ctypes.c_uint, ctypes.c_void_p,
            ctypes.c_void_p, ctypes.c_size_t]
    _cygwin_conv_path.restype = ctypes.c_ssize_t

    def native_path(path):
        flags = 0x100  # CCP_POSIX_TO_WIN_A | CCP_RELATIVE
        size = _cygwin_conv_path(flags, path, None, 0)
        if size == -1:
            raise OSError(ctypes.get_errno(), "Couldn't convert path")
        buf = ctypes.create_string_buffer(size)
        if _cygwin_conv_path(flags, path, buf, size) == -1:
            raise OSError(ctypes.get_errno(), "Couldn't convert path")
        return buf.value

    def symlink(src, dst):
        if not os.path.isabs(src):
            src = os.path.abspath(os.path.join(os.path.dirname(dst), src))
        subprocess.check_call(['!!BUILDDIR!!/symlink', native_path(src),
                native_path(dst)])
else:
    native_path = lambda p: p
    symlink = os.symlink


class ConnectionInterrupted(Exception):
    pass


def _command(f):
    '''Decorator to mark the function as a user command.'''
    _commands.append(f.__name__)
    _command_funcs[f.__name__] = f
    return f


def _color(color, str):
    '''Return str, wrapped in the specified ANSI color escape sequence.'''
    return color + str + RESET


def _list_tests(pattern='*'):
    '''Return a list of test names matching the specified pattern.'''
    return [name for name in sorted(os.listdir(CASEROOT))
            if fnmatch.fnmatch(name, pattern)
            and os.path.exists(os.path.join(CASEROOT, name, TESTCONF))]


def _list_slide_files(slide):
    '''List relative paths of files within a slide.  slide is e.g.
    "Mirax/CMU-1.zip".'''
    def walk(basedir):
        files = []
        for name in os.listdir(basedir):
            path = os.path.join(basedir, name)
            if os.path.isdir(path):
                files.extend(os.path.join(name, p) for p in walk(path))
            else:
                files.append(name)
        return files
    return walk(os.path.join(PRISTINE, slide))


def _load_test_config(testname):
    '''Parse and return the config.yaml for the specified test.'''
    with open(os.path.join(CASEROOT, testname, TESTCONF)) as fh:
        return yaml.safe_load(fh)


def _features_available(conf):
    '''Return True if the features required by the test configuration
    are available in this build.'''
    for feature in conf.get('requires', []):
        if feature not in FEATURES:
            return False
    return True


def _launch_test(test, slidefile, valgrind=False, extra_checks=True,
        testdir=None, debug=[], args=[], **kwargs):
    '''Start the specified test from the testdir directory against the
    specified slide, running under Valgrind if requested.  If extra_checks
    is False, turn off debug instrumentation that would invalidate benchmark
    results.  debug options are passed in OPENSLIDE_DEBUG.  args are
    appended to the command line.  kwargs are passed to the Popen
    constructor.  Return the Popen instance.'''

    if testdir is None:
        testdir = '!!BUILDDIR!!'
    env = os.environ.copy()
    env.update(
        G_MESSAGES_DEBUG='',
        OPENSLIDE_DEBUG=','.join(debug),
        GIO_USE_VFS='local',
    )
    if extra_checks:
        env.update(
            G_DEBUG='gc-friendly',
            G_SLICE='always-malloc',
            MALLOC_CHECK_='1',
        )
    args = [os.path.join(testdir, test), native_path(slidefile)] + args
    if valgrind:
        args = [os.path.join(testdir, '../libtool'), '--mode=execute',
                'valgrind', '--quiet', '--error-exitcode=3',
                '--suppressions=' + VALGRIND_SUPPRESSIONS,
                '--leak-check=full', '--num-callers=30'] + args
    elif extra_checks:
        # debug-blocks retains pointers to freed slices, so don't use it
        # with Valgrind
        env['G_SLICE'] += ',debug-blocks'
    return subprocess.Popen(args, env=env, **kwargs)


def _try_open_slide(slidefile, valgrind=False, testdir=None, debug=[],
        vendor=SKIP, properties={}, regions=[]):
    '''Try opening the specified slide file, under Valgrind if specified,
    using the test program in the testdir directory.  Return None on
    success, error message on failure.  vendor is the vendor string that
    should be returned by openslide_detect_vendor(), None for NULL, or SKIP
    to omit the test.  properties is a map of slide properties and their
    expected values.  regions is a list of region tuples (x, y, level, w,
    h).  debug is a list of OPENSLIDE_DEBUG options.'''

    args = []
    if vendor is not SKIP:
        args.extend(['-n', 'none' if vendor is None else vendor])
    for k, v in properties.items():
        args.extend(['-p', '='.join([k, (v or '')])])
    for region in regions:
        args.extend(['-r', ' '.join(str(d) for d in region)])
    proc = _launch_test('try_open', slidefile, valgrind=valgrind, args=args,
            testdir=testdir, debug=debug, stdout=subprocess.PIPE,
            stderr=subprocess.PIPE, text=True)
    out, err = proc.communicate()
    if out or err or proc.returncode > 0:
        return (out + err).strip()
    elif proc.returncode:
        return f'Exited with status {proc.returncode}'
    else:
        return None


def _try_extended(slidefile, valgrind=False, testdir=None):
    '''Run the extended test program against the specified slide file, under
    Valgrind if specified, using the test program in the testdir directory.
    Return None on success, error message on failure.'''

    proc = _launch_test('extended', slidefile, valgrind=valgrind,
            testdir=testdir, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            text=True)
    out, err = proc.communicate()
    if out or err:
        return (out + err).strip()
    elif proc.returncode:
        return f'Exited with status {proc.returncode}'
    else:
        return None


def _download(url, name, fh):
    '''Download the specified URL, write to the specified file handle, and
    return the SHA-256 of the data.  Raise ConnectionInterrupted on timeout
    or short read.'''

    print(f'Fetching {name}...\r', end='')
    sys.stdout.flush()
    r = requests.get(url, stream=True, timeout=120)
    r.raise_for_status()

    cur = 0
    last_update = 0
    size = int(r.headers['Content-Length'])
    hash = sha256()

    try:
        for chunk in r.iter_content(128 << 10):
            fh.write(chunk)
            hash.update(chunk)

            cur += len(chunk)
            now = curtime()
            if now - last_update >= 1:
                print(f'Fetching {name} ({cur >> 20}/{size >> 20} MB)...\r',
                        end='')
                sys.stdout.flush()
                last_update = now
        if cur != size:
            raise ConnectionInterrupted
    except (ConnectionInterrupted,
            requests.exceptions.Timeout,
            socket.timeout):
        print('{:<79}'.format(f'Failure fetching {name} ({cur >> 20}/{size >> 20} MB)'))
        raise ConnectionInterrupted
    else:
        print('{:<79}'.format(f'Fetched {name} ({size >> 20} MB)'))
        return hash.hexdigest()


def _fetch_one(slide):
    '''Download and unpack the base slide if we don't already have it.'''

    destpath = os.path.join(PRISTINE, slide)
    if os.path.exists(destpath):
        return

    with open(SLIDELIST) as fh:
        slides = yaml.safe_load(fh)
    if slide not in slides:
        raise ValueError(f'{slide} not in {SLIDELIST}')

    filename = os.path.basename(slide)
    url = urljoin(TESTDATA_URL, slide)
    is_zip = os.path.splitext(filename)[1] == '.zip'

    os.makedirs(destpath)
    try:
        if is_zip:
            dest = TemporaryFile()
        else:
            dest = open(os.path.join(destpath, filename), 'wb')

        with dest:
            for retries_remaining in range(4, -1, -1):
                try:
                    digest = _download(url, slide, dest)
                except ConnectionInterrupted:
                    if retries_remaining == 0:
                        raise
                    else:
                        print('Retrying...')
                        dest.seek(0)
                        dest.truncate()
                else:
                    break

            if digest != slides[slide]:
                raise ValueError(f'Hash mismatch: {slide}')

            if is_zip:
                print(f'Unpacking {slide}...')
                with closing(ZipFile(dest)) as zf:
                    zf.extractall(path=destpath)
    except:
        rmtree(destpath, ignore_errors=True)
        raise


@_command
def create(slide, testname):
    '''Create a new test case with the specified name and base slide (e.g.
    "Mirax/CMU-1.zip").'''

    srcpath = os.path.join(PRISTINE, slide)
    testpath = os.path.join(CASEROOT, testname)
    destpath = os.path.join(testpath, 'slide')

    if os.path.exists(testpath):
        raise ValueError('A test with that name already exists')
    _fetch_one(slide)

    print(f'Creating test {testname} for {slide}')

    for relpath in _list_slide_files(slide):
        curpath = os.path.join(srcpath, relpath)
        if _try_open_slide(curpath) is None:
            slidefile = relpath
            slidepath = curpath
            break
    else:
        raise IOError('Could not locate readable slide file')

    query = _launch_test('query', slidepath, args=['-n'],
            stdout=subprocess.PIPE, text=True)
    vendor, _ = query.communicate()
    if query.returncode:
        raise IOError('Could not query slide vendor')
    vendor = vendor.strip() or None

    os.mkdir(testpath)
    copytree(srcpath, destpath)
    conf = {
        'success': False,
        'error': '^$',
        'base': slide,
        'slide': slidefile,
        'vendor': vendor,
    }
    with open(os.path.join(testpath, TESTCONF), 'w') as fh:
        yaml.safe_dump(conf, fh, default_flow_style=False)


@_command
def pack(testname):
    '''Pack a newly-created test case for checkin.'''

    if not os.path.exists(os.path.join(CASEROOT, testname, TESTCONF)):
        raise ValueError('Test does not exist')

    print(f'Packing {testname}...')
    conf = _load_test_config(testname)
    slide = conf['base']

    total_size = 0
    for relpath in _list_slide_files(slide):
        origpath = os.path.join(PRISTINE, slide, relpath)
        newpath = os.path.join(CASEROOT, testname, 'slide', relpath)
        deltapath = os.path.join(CASEROOT, testname,
                os.path.basename(relpath) + '.xdelta')
        whiteoutpath = os.path.join(CASEROOT, testname,
                os.path.basename(relpath) + '.whiteout')

        for path in deltapath, whiteoutpath:
            if os.path.exists(path):
                raise IOError(f'{path} already exists')

        if os.path.exists(newpath):
            if not filecmp.cmp(origpath, newpath, shallow=False):
                subprocess.check_call(['xdelta3', 'encode', '-9', '-W',
                        '16777216', '-S', 'none', '-s', origpath, newpath,
                        deltapath])
                total_size += os.stat(deltapath).st_size
        else:
            open(whiteoutpath, 'w').close()

    rmtree(os.path.join(CASEROOT, testname, 'slide'))

    total_size_kb = total_size >> 10
    if total_size_kb:
        print(f'Delta: {total_size_kb} KB')
    else:
        print(f'Delta: {total_size} bytes')


def _run_generator(command_str, inpath, outpath):
    '''Run the specified generator pipeline.'''

    cmds = command_str.split('|')
    procs = []
    fin = []
    fout = []
    try:
        fin.append(None)
        for _ in range(len(cmds) - 1):
            pipe_r, pipe_w = os.pipe()
            fout.append(pipe_w)
            fin.append(pipe_r)
        fout.append(None)

        for i, cmd in enumerate(cmds):
            proc = subprocess.Popen([a % {'in': inpath, 'out': outpath}
                    for a in shlex.split(cmd)],
                    stdin=fin[i], stdout=fout[i], close_fds=True)
            procs.append(proc)
    finally:
        for fh in fout + fin:
            if fh is not None:
                os.close(fh)

    returncode = 0
    for proc in procs:
        proc.wait()
        returncode = returncode or proc.returncode
    if returncode:
        raise IOError(f'Generator returned exit status {returncode}')


def _unpack_one(testname):
    '''Unpack the specified test.'''

    conf = _load_test_config(testname)
    slide = conf['base']
    renames = conf.get('rename', {})
    generators = conf.get('generate', {})
    printed = False
    _fetch_one(slide)

    for relpath in _list_slide_files(slide):
        origpath = os.path.join(PRISTINE, slide, relpath)
        newpath = os.path.join(WORKROOT, testname,
                renames.get(os.path.basename(relpath), relpath))
        deltapath = os.path.join(CASEROOT, testname,
                os.path.basename(relpath) + '.xdelta')
        whiteoutpath = os.path.join(CASEROOT, testname,
                os.path.basename(relpath) + '.whiteout')

        if not os.path.exists(newpath) and not os.path.exists(whiteoutpath):
            if not printed:
                print(f'Unpacking {testname}...')
                printed = True

            newdir = os.path.dirname(newpath)
            os.makedirs(newdir, exist_ok=True)

            generator = generators.get(os.path.basename(relpath))
            if generator:
                _run_generator(generator, origpath, newpath)
            elif os.path.exists(deltapath):
                subprocess.check_call(['xdelta3', 'decode', '-s',
                        origpath, deltapath, newpath])
            else:
                src = os.path.relpath(origpath, os.path.dirname(newpath))
                symlink(src, newpath)


@_command
def unpack(pattern='*'):
    '''Unpack all tests matching the specified pattern.  If pattern is
    `nonfrozen`, unpack tests for which we don't have a frozen counterpart.'''
    conditional = False
    if pattern == 'nonfrozen':
        pattern = '*'
        conditional = True
    for testname in _list_tests(pattern):
        if not conditional or not os.path.exists(os.path.join(FROZEN, testname)):
            _unpack_one(testname)


# Read-only FUSE filesystem that proxies a backing directory tree and builds
# a shadow directory tree of sparse files containing only the accessed
# bytes.
#
# _fusefs_init() and _fusefs_run() are based on pyfuse3's
# examples/passthroughfs.py, copyright © Nikolaus Rath <Nikolaus.org>
def _fusefs_init(shadowdir):
    '''Prepare a FUSE filesystem to run, and mount it.'''
    # trio, imported by pyfuse3, tries to override sys.excepthook and
    # complains if the distro already put something there.  We don't need
    # the distro's error reporting, so reset the excepthook to default.
    sys.excepthook = sys.__excepthook__

    from pyfuse3 import FUSEError
    import pyfuse3

    class Operations(pyfuse3.Operations):
        def __init__(self):
            super().__init__()
            # inode -> (relative path within FUSE FS, backing path on disk)
            self._inode_path_map = { pyfuse3.ROOT_INODE: ('.', WORKROOT) }
            self._lookup_cnt = defaultdict(lambda: 0)
            self._fd_inode_map = dict()
            self._inode_fd_map = dict()
            self._fd_shadow_map = dict()
            self._fd_open_count = dict()

        def _inode_to_paths(self, inode):
            try:
                return self._inode_path_map[inode]
            except KeyError:
                raise FUSEError(errno.ENOENT)

        def _add_paths(self, inode, relpath, backingpath):
            self._lookup_cnt[inode] += 1
            self._inode_path_map.setdefault(inode, (relpath, backingpath))

        def _shadowpath(self, relpath):
            shadowpath = os.path.join(shadowdir, relpath)
            os.makedirs(os.path.dirname(shadowpath), exist_ok=True)
            return shadowpath

        async def forget(self, inode_list):
            for (inode, nlookup) in inode_list:
                if self._lookup_cnt[inode] > nlookup:
                    self._lookup_cnt[inode] -= nlookup
                    continue
                assert inode not in self._inode_fd_map
                del self._lookup_cnt[inode]
                try:
                    del self._inode_path_map[inode]
                except KeyError: # may have been deleted
                    pass

        async def lookup(self, inode_p, name, ctx=None):
            name = os.fsdecode(name)
            relpath, backingpath = [os.path.join(p, name) for p in
                    self._inode_to_paths(inode_p)]
            # Special case: map PRISTINE into the filesystem even though it's
            # outside WORKROOT
            if inode_p == pyfuse3.ROOT_INODE and name == '_pristine':
                backingpath = PRISTINE
            attr = self._getattr(backingpath)
            if name != '.' and name != '..':
                self._add_paths(attr.st_ino, relpath, backingpath)
            return attr

        async def getattr(self, inode, ctx=None):
            _, backingpath = self._inode_to_paths(inode)
            return self._getattr(backingpath)

        def _getattr(self, backingpath):
            try:
                stat = os.lstat(backingpath)
            except OSError as exc:
                raise FUSEError(exc.errno)
            entry = pyfuse3.EntryAttributes()
            for attr in ('st_ino', 'st_mode', 'st_nlink', 'st_uid', 'st_gid',
                         'st_rdev', 'st_size', 'st_atime_ns', 'st_mtime_ns',
                         'st_ctime_ns'):
                setattr(entry, attr, getattr(stat, attr))
            entry.generation = 0
            entry.entry_timeout = 0
            entry.attr_timeout = 0
            entry.st_blksize = 512
            entry.st_blocks = ((entry.st_size+entry.st_blksize-1) // entry.st_blksize)
            return entry

        async def readlink(self, inode, ctx):
            relpath, backingpath = self._inode_to_paths(inode)
            try:
                target = os.readlink(backingpath)
                # calculate target relative to the root of the FUSE FS
                reltarget = os.path.normpath(
                        os.path.join(os.path.dirname(relpath), target))
                # calculate path to PRISTINE relative to WORKROOT
                relpristine = os.path.relpath(PRISTINE, WORKROOT)
                if reltarget.startswith(relpristine):
                    # link points to pristine directory; repoint it inside
                    # the FUSE root
                    reltarget = reltarget.replace(relpristine, '_pristine')
                    target = os.path.relpath(reltarget,
                            os.path.dirname(relpath))
                shadowpath = self._shadowpath(relpath)
                if not os.path.lexists(shadowpath):
                    symlink(target, shadowpath)
            except OSError as exc:
                raise FUSEError(exc.errno)
            return os.fsencode(target)

        async def opendir(self, inode, ctx):
            return inode

        async def readdir(self, inode, off, token):
            reldir, backingdir = self._inode_to_paths(inode)
            entries = []
            if inode == pyfuse3.ROOT_INODE:
                # Insert _pristine into the root dir
                attr = self._getattr(PRISTINE)
                entries.append((attr.st_ino, '_pristine',
                        os.path.join(reldir, '_pristine'), PRISTINE, attr))
            for name in os.listdir(backingdir):
                if name == '.' or name == '..':
                    continue
                relpath = os.path.join(reldir, name)
                backingpath = os.path.join(backingdir, name)
                attr = self._getattr(backingpath)
                entries.append((attr.st_ino, name, relpath, backingpath, attr))
            for (ino, name, relpath, backingpath, attr) in sorted(entries):
                if ino <= off:
                    continue
                if not pyfuse3.readdir_reply(
                    token, os.fsencode(name), attr, ino):
                    break
                self._add_paths(attr.st_ino, relpath, backingpath)

        async def statfs(self, ctx):
            stat_ = pyfuse3.StatvfsData()
            try:
                statfs = os.statvfs(WORKROOT)
            except OSError as exc:
                raise FUSEError(exc.errno)
            for attr in ('f_bsize', 'f_frsize', 'f_blocks', 'f_bfree', 'f_bavail',
                         'f_files', 'f_ffree', 'f_favail'):
                setattr(stat_, attr, getattr(statfs, attr))
            stat_.f_namemax = statfs.f_namemax - (len(WORKROOT)+1)
            return stat_

        async def open(self, inode, flags, ctx):
            # set direct_io to avoid kernel readahead
            if inode in self._inode_fd_map:
                fd = self._inode_fd_map[inode]
                self._fd_open_count[fd] += 1
                return pyfuse3.FileInfo(fh=fd, direct_io=True)
            assert flags & os.O_CREAT == 0
            try:
                relpath, backingpath = self._inode_to_paths(inode)
                fd = os.open(backingpath, flags)
                shadow = os.fdopen(os.open(self._shadowpath(relpath),
                        os.O_WRONLY | os.O_CREAT, 0o644), 'wb')
                shadow.truncate(os.lseek(fd, 0, os.SEEK_END))
            except OSError as exc:
                raise FUSEError(exc.errno)
            self._inode_fd_map[inode] = fd
            self._fd_inode_map[fd] = inode
            self._fd_shadow_map[fd] = shadow
            self._fd_open_count[fd] = 1
            return pyfuse3.FileInfo(fh=fd, direct_io=True)

        async def read(self, fd, offset, length):
            os.lseek(fd, offset, os.SEEK_SET)
            buf = os.read(fd, length)
            shadow = self._fd_shadow_map[fd]
            shadow.seek(offset)
            shadow.write(buf)
            return buf

        async def release(self, fd):
            if self._fd_open_count[fd] > 1:
                self._fd_open_count[fd] -= 1
                return

            del self._fd_open_count[fd]
            inode = self._fd_inode_map[fd]
            del self._inode_fd_map[inode]
            del self._fd_inode_map[fd]
            shadow = self._fd_shadow_map.pop(fd)
            try:
                os.close(fd)
                shadow.close()
            except OSError as exc:
                raise FUSEError(exc.errno)

    try:
        os.stat(FUSEMOUNT)
    except OSError as e:
        if e.errno == errno.ENOTCONN:
            # Clean up old mountpoint
            subprocess.check_call(['fusermount', '-u', FUSEMOUNT])
    os.makedirs(FUSEMOUNT, exist_ok=True)
    fuse_options = set(pyfuse3.default_options)
    fuse_options.update(['fsname=openslide', 'ro'])
    pyfuse3.init(Operations(), FUSEMOUNT, fuse_options)


def _fusefs_run():
    '''Run an initialized FUSE filesystem.'''
    import pyfuse3
    import trio
    try:
        trio.run(pyfuse3.main)
    except:
        pyfuse3.close(unmount=False)
        raise
    pyfuse3.close()


def _s3_upload_public(bucket, key, fh, content_type='application/octet-stream'):
    '''Upload public content to the specified S3 bucket and key.  Return
    a public URL.'''
    import boto3
    s3 = boto3.client('s3')
    # Calculate public URL
    region = s3.get_bucket_location(Bucket=bucket)['LocationConstraint'] \
            or 'us-east-1'
    url = f'https://{bucket}.s3.dualstack.{region}.amazonaws.com/{key}'
    # Skip re-uploading existing objects
    try:
        s3.head_object(Bucket=bucket, Key=key)
        return url
    except s3.exceptions.ClientError as e:
        # https://github.com/boto/boto3/issues/2442
        if e.response['Error']['Code'] != '404':
            raise
    # Set up progress reporting
    length = fh.seek(0, os.SEEK_END)
    last_update = 0
    uploaded = 0
    def progress(count):
        nonlocal last_update, uploaded
        uploaded += count
        now = curtime()
        if now - last_update >= 1:
            print(f'Uploading {uploaded >> 20}/{length >> 20} MB...\r',
                    end='')
            sys.stdout.flush()
            last_update = now
    # Upload object
    fh.seek(0)
    s3.upload_fileobj(fh, bucket, key, Callback=progress, ExtraArgs={
        'ACL': 'public-read',
        'ContentType': content_type
    })
    print('{:<79}'.format(f'Uploaded {length >> 20} MB'))
    return url


@_command
def freeze(bucket=DEFAULT_FROZEN_BUCKET):
    '''Create a frozen testdata archive for transport to another system,
    upload it to an S3 bucket, and record its URL in the source tree.'''
    for testname in _list_tests():
        _unpack_one(testname)

    print('Running tests...')
    exclude_tests = []
    with TemporaryDirectory(prefix='shadow-', dir=FROZENBASE) as tempdir:
        _fusefs_init(tempdir)
        Thread(name='fuse', target=_fusefs_run, daemon=False).start()
        # Run all tests that we might want to run on thawed testdata
        for testname in _list_tests():
            conf = _load_test_config(testname)
            if conf.get('freezable', True):
                _run_one(testname, workdir=FUSEMOUNT)
                # Ensure we at least have a shadow directory for the test.
                # Otherwise, tests that solely test opening a missing file
                # in a multi-file format will cause a base slide fetch and
                # unpack when run against frozen data.
                os.makedirs(os.path.join(tempdir, testname), exist_ok=True)
            else:
                exclude_tests.append(testname)
        with NamedTemporaryFile(prefix='openslide-') as fh:
            _mosaic(fh.name, os.path.join(FUSEMOUNT, '_pristine'))
        subprocess.check_call(['fusermount', '-u', FUSEMOUNT])

        print('Freezing testdata...')
        # Delete spurious shadow directories from any incidental filesystem
        # access to skipped tests
        for testname in exclude_tests:
            path = os.path.join(tempdir, testname)
            if os.path.exists(path):
                rmtree(path)
        # Python tarfile doesn't support creating sparse archives
        proc = subprocess.Popen(['gtar', 'c', '--sparse',
                # Make archive reproducible
                '--owner=0', '--group=0', '--mtime=@0', '--sort=name',
                '-C', tempdir, '.'], stdout=subprocess.PIPE)
        compressor = LZMACompressor(preset=9)
        hasher = sha256()
        with TemporaryFile(prefix='frozen-', dir=FROZENBASE) as fh:
            def update(buf):
                hasher.update(buf)
                fh.write(buf)
            while True:
                buf = proc.stdout.read(1 << 20)
                if not buf:
                    break
                update(compressor.compress(buf))
            update(compressor.flush())
            proc.wait()
            if proc.returncode != 0:
                raise IOError('tar failed')
            digest = hasher.hexdigest()

            print('Uploading archive...')
            url = _s3_upload_public(bucket, digest, fh,
                    content_type='application/x-xz')

    manifest = {
        'url': url,
        'sha256': digest,
    }
    with open(FROZENLIST, 'w') as fh:
        yaml.safe_dump(manifest, fh, default_flow_style=False)


@_command
def unfreeze():
    '''Download and unpack the current frozen archive.'''
    with open(FROZENLIST) as fh:
        manifest = yaml.safe_load(fh)
        sha256 = manifest['sha256']
        link_target = 'frozen-' + sha256
    if os.path.lexists(FROZEN):
        current = os.readlink(FROZEN)
        if current == link_target:
            return
        else:
            os.unlink(FROZEN)
            basedir = os.path.join(FROZENBASE, current)
            if os.path.exists(basedir):
                rmtree(basedir)
    basedir = os.path.join(FROZENBASE, link_target)
    os.makedirs(basedir)
    with TemporaryFile(dir=FROZENBASE, prefix='frozen-') as fh:
        found_sha256 = _download(manifest['url'], 'frozen archive', fh)
        if found_sha256 != sha256:
            raise IOError(f'Hash mismatch: expected {sha256}, found {found_sha256}')
        fh.seek(0)
        print('Unpacking...')
        tf = tarfile.open(fileobj=fh)
        while True:
            info = tf.next()
            if info is None:
                break
            # directory traversal shouldn't happen because we check the
            # tarball hash, but check anyway
            normalized = os.path.normpath(info.name)
            if normalized.startswith('/') or normalized.startswith('../'):
                raise IOError(f'Directory traversal: {info.name}')
            if info.isfile() or info.isdir():
                tf.extract(info, path=basedir, set_attrs=False)
            elif info.issym():
                symlink(info.linkname, os.path.join(basedir, info.name))
            else:
                raise IOError(f'Unexpected type: {info.name}')
    symlink(link_target, FROZEN)


def _run_one(testname, valgrind=False, xfail=False, testdir=None,
        workdir=WORKROOT):
    '''Run the specified test, under Valgrind if specified.  Also execute
    extended tests against cases which 1) are marked primary, 2) are expected
    to succeed, and 3) do in fact succeed.  If xfail is specified, invert
    the sense of the result.'''

    conf = _load_test_config(testname)
    if not _features_available(conf):
        print(_color(BLUE, f'{testname}: skipped'))
        return True
    slidefile = os.path.join(workdir, testname, conf['slide'])
    result = _try_open_slide(slidefile, valgrind, testdir,
            vendor=conf.get('vendor', None),
            properties=conf.get('properties', {}),
            regions=conf.get('regions', []),
            debug=conf.get('debug', []))

    msg = _color(GREEN, f'{testname}: OK')
    ok = True
    if result is None and not conf['success']:
        msg = _color(RED, f'{testname}: unexpected success')
        ok = False
    elif result is not None and conf['success']:
        msg = _color(RED, f'{testname}: unexpected failure: {result}')
        ok = False
    elif result is not None and not re.search(conf['error'], result):
        msg = _color(RED, f'{testname}: incorrect error: {result}')
        ok = False
    elif conf.get('primary', False) and conf['success']:
        result = _try_extended(slidefile, valgrind, testdir)
        if result:
            msg = _color(RED, f'{testname}: extended test failed: {result}')
            ok = False

    if xfail:
        ok = not ok
        if ok:
            msg = _color(BLUE, f'{testname}: failed as expected')
        else:
            msg = _color(RED, f'{testname}: expected to fail, but passed')

    print(msg)
    return ok


def _run_all(pattern='*', valgrind=False, xfail=None, testdir=None):
    '''Run all tests matching the specified pattern, under Valgrind if
    specified.  xfail specifies a list of tests which are expected to fail.
    Return the number of tests producing unexpected results.'''
    tests = _list_tests(pattern)
    for testname in tests:
        if not os.path.exists(os.path.join(FROZEN, testname)):
            _unpack_one(testname)
    failed = 0
    xfail = set(xfail or [])
    for testname in tests:
        # Prefer the real test if available
        if os.path.exists(os.path.join(WORKROOT, testname)):
            workdir = WORKROOT
        else:
            workdir = FROZEN
        if not _run_one(testname, valgrind, testname in xfail, testdir,
                workdir=workdir):
            failed += 1
    print(f'\nFailed: {failed}/{len(tests)}')
    return failed


@_command
def run(pattern='*'):
    '''Unpack and run all tests matching the specified pattern.  Ignore
    failures of test cases listed in the comma-separated
    OPENSLIDE_TEST_XFAIL environment variable.'''
    xfail = os.environ.get('OPENSLIDE_TEST_XFAIL')
    xfail = xfail.split(',') if xfail else []
    if _run_all(pattern, xfail=xfail):
        sys.exit(1)


@contextmanager
def _rebuild(configure_args):
    '''Context manager: rebuild the source with the specified CFLAGS and
    yield to the caller to do profiling.'''
    # To minimize collateral damage, unpack the dist tarball into a temp
    # directory and build there.

    top_builddir = os.path.dirname('!!BUILDDIR!!')
    prevdir = os.getcwd()

    # Make tarball
    os.chdir(top_builddir)
    subprocess.check_call(['make', 'dist-gzip'])
    os.chdir(prevdir)
    tarpath = os.path.join(top_builddir, 'openslide-!!VERSION!!.tar.gz')

    # Unpack and remove the tarball
    tempdir = mkdtemp(prefix='build-', dir=prevdir)
    os.chdir(tempdir)
    tarfile.open(tarpath, 'r:gz').extractall()
    os.unlink(tarpath)
    os.chdir('openslide-!!VERSION!!')

    # Build with specified CFLAGS
    subprocess.check_call(['./configure'] + configure_args)
    subprocess.check_call(['make'])

    # Let the caller run, passing it the directory we came from.
    # Intentionally don't clean up tempdir on exception.
    yield prevdir

    # Remove temporary directory
    os.chdir(prevdir)
    rmtree(tempdir)


@_command
def coverage(outfile):
    '''Unpack and run all tests and write coverage report to outfile.'''
    with _rebuild(['CFLAGS=-O0 -g -fprofile-arcs -ftest-coverage']) as basedir:
        # Run tests
        _run_all(testdir='test')

        # Generate coverage reports
        for dirpath, dirnames, filenames in os.walk('src'):
            paths = [os.path.join(dirpath, name)
                    for name in fnmatch.filter(sorted(filenames), '*.gcda')]
            if paths:
                subprocess.check_call(['gcov', '-o', dirpath] + paths)

        # Record unexecuted lines
        proc = subprocess.Popen(['grep', '-FC', '2', '#####'] +
                fnmatch.filter(sorted(os.listdir('.')), '*.gcov'),
                stdout=subprocess.PIPE)
        report, _ = proc.communicate()
        if proc.returncode:
            raise IOError(f'Process returned exit status {proc.returncode}')
        report = '\n'.join(l.replace('.c.gcov', '.c', 1)
                for l in report.split('\n'))
        with open(os.path.join(basedir, outfile), 'w') as fh:
            fh.write(report)


@_command
def valgrind(pattern='*'):
    '''Unpack and Valgrind all tests matching the specified pattern.
    Ignore failures of test cases listed in the OPENSLIDE_VALGRIND_XFAIL
    environment variable.'''
    xfail = os.environ.get('OPENSLIDE_VALGRIND_XFAIL')
    xfail = xfail.split(',') if xfail else []
    if _run_all(pattern, valgrind=True, xfail=xfail):
        sys.exit(1)


def _mosaic(outfile, pristinedir=PRISTINE):
    '''Produce a mosaic image of slide data from various formats.'''
    cfg = RawConfigParser()
    cfg.optionxform = str
    cfg.read(MOSAICLIST)
    for section in cfg.sections():
        _fetch_one(cfg.get(section, 'base'))
    subprocess.check_call([os.path.join('!!BUILDDIR!!', 'mosaic'),
            native_path(pristinedir), native_path(MOSAICLIST),
            native_path(outfile)])


@_command
def mosaic(outfile):
    '''Produce a mosaic image of slide data from various formats.'''
    if os.path.exists(FROZEN):
        pristinedir = os.path.join(FROZEN, '_pristine')
    else:
        pristinedir = PRISTINE
    _mosaic(outfile, pristinedir)


def _successful_primary_tests(pattern='*'):
    '''Yield testname and slide path for each successful primary test.'''
    for testname in _list_tests(pattern):
        conf = _load_test_config(testname)
        if (not conf.get('primary', False) or not conf['success']
                or not _features_available(conf)):
            continue
        _unpack_one(testname)
        slidefile = os.path.join(WORKROOT, testname, conf['slide'])
        yield testname, slidefile


@_command
def time(pattern='*'):
    '''Time openslide_open() for all successful primary tests matching the
    specified pattern.'''
    for testname, slidefile in _successful_primary_tests(pattern):
        proc = _launch_test('try_open', slidefile, args=['-t'],
                extra_checks=False, stdout=subprocess.PIPE,
                stderr=subprocess.PIPE, text=True)
        out, err = proc.communicate()
        if proc.returncode or err:
            out = 'failed'
        print(f'{testname:<40} {out.strip():<10}')


@_command
def profile(pattern='*', level=0):
    '''Profile openslide_read_region() on the specified level for all
    successful primary tests matching the specified pattern.'''
    env = os.environ.copy()
    env.update(
        G_MESSAGES_DEBUG='',
        OPENSLIDE_DEBUG='performance',
    )
    line = '#' * 79
    for testname, slidefile in _successful_primary_tests(pattern):
        print(f'{line}\n# {testname}\n{line}\n')
        with NamedTemporaryFile(prefix='openslide-callgrind-') as fh:
            args = ['!!BUILDDIR!!/../libtool', '--mode=execute',
                    'valgrind', '--quiet', '--error-exitcode=3',
                    '--tool=callgrind', '--callgrind-out-file=' + fh.name,
                    '--instr-atstart=no',
                    '!!BUILDDIR!!/profile', slidefile, str(level)]
            if subprocess.call(args, env=env) == 0:
                subprocess.check_call(['callgrind_annotate',
                        '--threshold=80', fh.name])


@_command
def exports():
    '''Report exported or hidden symbols with improper names.'''
    def get_symbols(basedir):
        proc = subprocess.Popen(['objdump', '-T',
                os.path.join(basedir, 'src', '.libs', 'libopenslide.so')],
                stdout=subprocess.PIPE)
        out, _ = proc.communicate()
        if proc.returncode:
            raise IOError(f'objdump returned exit status {proc.returncode}')
        for line in out.splitlines():
            words = line.split()
            if len(words) < 3:
                continue
            if words[1] != 'g':
                # Not a global symbol
                continue
            yield words[-1]

    # Magic ELF symbols
    ignore_symbols = {'__bss_start', '_edata', '_end', '_fini', '_init'}

    # Check exported symbols
    exported_symbols = set(get_symbols('..')) - ignore_symbols
    bad_exported = set(symbol for symbol in exported_symbols
            if not symbol.startswith('openslide_'))

    # Check hidden symbols
    with _rebuild(['gl_cv_cc_visibility=no']):
        hidden_symbols = (set(get_symbols('.')) - exported_symbols -
                ignore_symbols)
        bad_hidden = set(symbol for symbol in hidden_symbols
                if not symbol.startswith('_openslide_')
                and symbol != 'openslide_cairo_read_region')  # legacy

    # Report
    for symbol in sorted(bad_exported):
        print('Badly-named exported symbol:', symbol, file=sys.stderr)
    for symbol in sorted(bad_hidden):
        print('Badly-named hidden symbol:', symbol, file=sys.stderr)
    if bad_exported or bad_hidden:
        sys.exit(1)


@_command
def clean(pattern='*'):
    '''Delete temporary slide data for tests matching the specified
    pattern.  If pattern is `frozen` or omitted, also delete unfrozen
    test data.'''
    for testname in _list_tests(pattern):
        path = os.path.join(WORKROOT, testname)
        if os.path.exists(path):
            rmtree(path)
    if pattern == '*' or pattern == 'frozen':
        if os.path.lexists(FROZEN):
            basedir = os.path.join(FROZENBASE, os.readlink(FROZEN))
            os.unlink(FROZEN)
            if os.path.exists(basedir):
                rmtree(basedir)


@_command
def fuse():
    '''Mount the FUSE filesystem for debugging.'''
    with TemporaryDirectory(prefix='shadow-', dir=FROZENBASE) as tempdir:
        _fusefs_init(tempdir)
        _fusefs_run()


def _get_arglist(f):
    '''Return two lists of argument names for the specified function: the
    mandatory arguments and the optional ones.'''
    info = inspect.getfullargspec(f)
    if info.defaults:
        optcount = len(info.defaults)
        return info.args[:-optcount], info.args[-optcount:]
    else:
        return info.args, []


def _usage():
    '''Print usage message and exit.'''
    wrapper = textwrap.TextWrapper(width=76, initial_indent=' ' * 8,
            subsequent_indent=' ' * 8)
    print('Usage:')
    for name in _commands:
        f = _command_funcs[name]
        args, optargs = _get_arglist(f)
        argspecs = [f'<{a}>' for a in args] + [f'[{a}]' for a in optargs]
        print(f'    {name} {" ".join(argspecs)}')
        print(wrapper.fill(f.__doc__ or 'Undocumented.'))
        print()
    sys.exit(2)


def _main():
    try:
        cmd = sys.argv[1]
    except IndexError:
        _usage()
    try:
        f = _command_funcs[cmd]
    except KeyError:
        _usage()
    args, optargs = _get_arglist(f)
    argc = len(sys.argv) - 2
    if argc < len(args) or argc > len(args) + len(optargs):
        _usage()
    f(*sys.argv[2:])


if __name__ == '__main__':
    _main()
